package com.xiaofan.udftest

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{Table, _}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val inputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(3)) {
      override def extractTimestamp(element: SensorReading) = element.timestamp * 1000L
    })

    val sensorTable: Table = tabEnv.fromDataStream(dataStream, $"id", $"temperature", $"timestamp".rowtime() as "ts")

    // 调用自定义函数，对id进行hash运算
    // 1. table api
    val hashCode = new HashCode(2)
    val resultTable: Table = sensorTable
      .select($"id", $"ts", hashCode($"id"))

    // 2. sql
    tabEnv.createTemporaryView("sensor", sensorTable)
    // 注册函数
    tabEnv.createTemporaryFunction("hashCode", hashCode)

    val resultSqlTable: Table = tabEnv.sqlQuery("select id, ts, hashCode(id) from sensor")

    tabEnv.toAppendStream[Row](resultTable).print("table ")
    tabEnv.toAppendStream[Row](resultSqlTable).print("sql ")

    env.execute("function test")

  }
}


/**
 * 自定义标量函数
 */
class HashCode(factor: Int) extends ScalarFunction {

  // 这个函数名字不能变
  def eval(s: String): Int = {
    s.hashCode * factor - 10000
  }
}
